contents

Kafka Connect는 아파치 카프카와 다른 시스템 간에 데이터를 안정적으로 스트리밍하기 위한 프레임워크입니다. 이는 카프카에 포함된 도구로, 커스텀 코드를 작성하지 않고도 대규모 데이터셋을 카프카 안팎으로 이동시키는 간단하고, 확장 가능하며, 내결함성 있는 방법을 제공합니다.

Kafka Connect를 데이터를 위한 강력한 만능 어댑터 🔌 세트라고 생각할 수 있습니다. 카프카에 연결하려는 모든 데이터베이스나 애플리케이션에 대해 매번 커스텀 커넥터를 만드는 대신, Kafka Connect의 재사용 가능한 구성 요소를 사용합니다.


Kafka Connect가 해결하는 문제

Kafka Connect 이전에는 PostgreSQL 데이터베이스의 데이터를 카프카 토픽으로 가져오려면 커스텀 애플리케이션(카프카 프로듀서)을 작성해야 했습니다. 그런 다음 그 토픽의 데이터를 Elasticsearch로 옮기려면 또 다른 커스텀 애플리케이션(카프카 컨슈머)을 작성해야 했습니다.

이 접근 방식에는 몇 가지 문제가 있습니다.

Kafka Connect는 이 모든 어려운 부분을 처리하는 표준화된 프레임워크를 제공하여, 코드가 아닌 설정에 집중할 수 있게 함으로써 이러한 문제를 해결합니다.


핵심 아키텍처 및 구성 요소 ⚙️

Kafka Connect는 카프카 브로커와는 별개의 프로세스(또는 프로세스 클러스터)로 실행됩니다. 명확한 계층 구조를 가집니다.


주로 설정하는 구성 요소

데이터 파이프라인을 설정할 때 주로 다음 세 가지 구성 요소를 다루게 됩니다.

1. 커넥터 (Connectors)

커넥터는 외부 데이터 저장소와 인터페이스하는 플러그인입니다. 두 가지 유형이 있습니다.

Confluent Hub와 같은 플랫폼에는 거의 모든 인기 있는 데이터 시스템(데이터베이스, 클라우드 스토리지, SaaS API 등)을 위한 방대한 양의 사전 구축된 커넥터가 있습니다.

2. 컨버터 (Converters)

카프카 토픽은 원시 바이트만 저장합니다. 컨버터는 데이터가 Kafka Connect를 드나들 때 데이터의 직렬화 및 역직렬화를 처리하는 중요한 구성 요소입니다. 데이터 형식을 정의합니다.

3. 트랜스폼 (Single Message Transforms - SMTs)

트랜스폼은 별도의 스트림 처리 애플리케이션 없이 파이프라인을 통과하는 메시지에 대해 간단한 인라인 수정을 수행할 수 있는 매우 유용한 기능입니다.


운영 모드

Kafka Connect는 두 가지 모드로 실행할 수 있습니다.

단독 모드 (Standalone Mode)

분산 모드 (Distributed Mode) 🚀


실제 예시: JDBC 소스 커넥터

MySQL 데이터베이스의 users 테이블에서 모든 신규 사용자를 카프카 토픽으로 스트리밍하고 싶다고 가정해 봅시다.

  1. Kafka Connect를 분산 모드로 실행합니다.
  2. 새로운 JdbcSourceConnector를 정의하는 JSON 설정을 REST API에 제출합니다.
  3. 이 설정에는 다음이 포함됩니다.
    • 커넥터 클래스 (io.confluent.connect.jdbc.JdbcSourceConnector).
    • 데이터베이스 연결 정보 (URL, 사용자, 암호).
    • 모니터링할 테이블 (table.whitelist: "users").
    • 새로운 행을 감지하는 모드 (예: mode: "timestamp", updated_at 열을 사용하여 신규 또는 업데이트된 행 찾기).
    • 데이터를 쓸 카프카 토픽 (topic.prefix).
  4. 다음 과정: Kafka Connect 클러스터는 이 커넥터를 위한 태스크(또는 여러 태스크)를 생성합니다. 이 태스크는 주기적으로 users 테이블을 쿼리하고(SELECT * FROM users WHERE updated_at > ?), 각 새로운 행을 설정된 형식(예: Avro)으로 변환한 후, 처리한 마지막 updated_at 타임스탬프를 자동으로 관리하며 users 카프카 토픽에 발행합니다.

references